home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-9.10-netbook-remix-PL.iso / casper / filesystem.squashfs / usr / share / pyshared / coherence / transcoder.py < prev   
Text File  |  2009-05-12  |  13KB  |  390 lines

  1. # -*- coding: utf-8 -*-
  2.  
  3. # Licensed under the MIT license
  4. # http://opensource.org/licenses/mit-license.php
  5.  
  6. # Copyright 2008, Frank Scholz <coherence@beebits.net>
  7.  
  8. """ transcoder classes to be used in combination with
  9.     a Coherence MediaServer
  10.  
  11.     using GStreamer pipelines for the actually work
  12.     and feeding the output into a http response
  13. """
  14.  
  15. import pygst
  16. pygst.require('0.10')
  17. import gst
  18. import gobject
  19. gobject.threads_init ()
  20.  
  21. import urllib
  22.  
  23. from twisted.web import resource, server
  24.  
  25. from coherence import log
  26.  
  27. import struct
  28.  
  29. class FakeTransformer(gst.Element, log.Loggable):
  30.     logCategory = 'faker_datasink'
  31.  
  32.     _sinkpadtemplate = gst.PadTemplate ("sinkpadtemplate",
  33.                                         gst.PAD_SINK,
  34.                                         gst.PAD_ALWAYS,
  35.                                         gst.caps_new_any())
  36.  
  37.     _srcpadtemplate =  gst.PadTemplate ("srcpadtemplate",
  38.                                         gst.PAD_SRC,
  39.                                         gst.PAD_ALWAYS,
  40.                                         gst.caps_new_any())
  41.  
  42.     def __init__(self,destination=None,request=None):
  43.         gst.Element.__init__(self)
  44.         self.sinkpad = gst.Pad(self._sinkpadtemplate, "sink")
  45.         self.srcpad = gst.Pad(self._srcpadtemplate, "src")
  46.         self.add_pad(self.sinkpad)
  47.         self.add_pad(self.srcpad)
  48.  
  49.         self.sinkpad.set_chain_function(self.chainfunc)
  50.  
  51.         self.buffer = ''
  52.         self.buffer_size = 0
  53.         self.proxy = False
  54.         self.got_new_segment = False
  55.         self.closed = False
  56.  
  57.     def get_fake_header(self):
  58.         return struct.pack(">L4s", 32, 'ftyp') + \
  59.             "mp42\x00\x00\x00\x00mp42mp41isomiso2"
  60.  
  61.     def chainfunc(self, pad, buffer):
  62.         if self.proxy:
  63.             # we are in proxy mode already
  64.             self.srcpad.push(buffer)
  65.             return gst.FLOW_OK
  66.  
  67.         self.buffer = self.buffer + buffer.data
  68.         if not self.buffer_size:
  69.             try:
  70.                 self.buffer_size, a_type = struct.unpack(">L4s", self.buffer[:8])
  71.             except:
  72.                 return gst.FLOW_OK
  73.  
  74.         if len(self.buffer) < self.buffer_size:
  75.             # we need to buffer more
  76.             return gst.FLOW_OK
  77.  
  78.         buffer = self.buffer[self.buffer_size:]
  79.         fake_header = self.get_fake_header()
  80.         n_buf = gst.Buffer(fake_header + buffer)
  81.         self.proxy = True
  82.         self.srcpad.push(n_buf)
  83.  
  84.         return gst.FLOW_OK
  85.  
  86. gobject.type_register(FakeTransformer)
  87.  
  88. class DataSink(gst.Element, log.Loggable):
  89.  
  90.     logCategory = 'transcoder_datasink'
  91.  
  92.     _sinkpadtemplate = gst.PadTemplate ("sinkpadtemplate",
  93.                                         gst.PAD_SINK,
  94.                                         gst.PAD_ALWAYS,
  95.                                         gst.caps_new_any())
  96.  
  97.     def __init__(self,destination=None,request=None):
  98.         gst.Element.__init__(self)
  99.         self.sinkpad = gst.Pad(self._sinkpadtemplate, "sink")
  100.         self.add_pad(self.sinkpad)
  101.  
  102.         self.sinkpad.set_chain_function(self.chainfunc)
  103.         self.sinkpad.set_event_function(self.eventfunc)
  104.         self.destination = destination
  105.         self.request = request
  106.  
  107.         if self.destination != None:
  108.             self.destination = open(self.destination, 'wb')
  109.         self.buffer = ''
  110.         self.data_size = 0
  111.         self.got_new_segment = False
  112.         self.closed = False
  113.  
  114.     def chainfunc(self, pad, buffer):
  115.         if self.closed == True:
  116.             return gst.FLOW_OK
  117.         if self.destination != None:
  118.             self.destination.write(buffer.data)
  119.         elif self.request != None:
  120.             self.buffer += buffer.data
  121.             if len(self.buffer) > 200000:
  122.                 self.request.write(self.buffer)
  123.                 self.buffer = ''
  124.         else:
  125.             self.buffer += buffer.data
  126.  
  127.         self.data_size += buffer.size
  128.         return gst.FLOW_OK
  129.  
  130.     def eventfunc(self, pad, event):
  131.         if event.type == gst.EVENT_NEWSEGMENT:
  132.             if self.got_new_segment == False:
  133.                 self.got_new_segment = True
  134.             else:
  135.                 self.closed = True
  136.         elif event.type == gst.EVENT_EOS:
  137.             if self.destination != None:
  138.                 self.destination.close()
  139.             elif self.request != None:
  140.                 if len(self.buffer) > 0:
  141.                     self.request.write(self.buffer)
  142.                 self.request.finish()
  143.         return True
  144.  
  145.  
  146. gobject.type_register(DataSink)
  147.  
  148. class GStreamerPipeline(resource.Resource, log.Loggable):
  149.     logCategory = 'gstreamer'
  150.     addSlash = True
  151.  
  152.     def __init__(self,pipeline,mimetype):
  153.         self.uri = pipeline
  154.         self.contentType = mimetype
  155.         resource.Resource.__init__(self)
  156.  
  157.     def start(self,request=None):
  158.         self.info("GStreamerPipeline start %r %r" % (request,self.uri))
  159.         self.pipeline = gst.parse_launch(
  160.                             self.uri)
  161.         sink = DataSink(request=request)
  162.         self.pipeline.add(sink)
  163.         enc = self.pipeline.get_by_name('enc')
  164.         enc.link(sink)
  165.         self.pipeline.set_state(gst.STATE_PLAYING)
  166.  
  167.         d = request.notifyFinish()
  168.         d.addBoth(self.requestFinished)
  169.  
  170.     def getChild(self, name, request):
  171.         self.info('getChild %s, %s' % (name, request))
  172.         return self
  173.  
  174.     def render_GET(self,request):
  175.         self.info('render GET %r' % (request))
  176.         request.setResponseCode(200)
  177.         if hasattr(self,'contentType'):
  178.             request.setHeader('Content-Type', self.contentType)
  179.         request.write('')
  180.  
  181.         headers = request.getAllHeaders()
  182.         if('connection' in headers and
  183.            headers['connection'] == 'close'):
  184.             pass
  185.  
  186.         self.start(request)
  187.         return server.NOT_DONE_YET
  188.  
  189.     def render_HEAD(self,request):
  190.         self.info('render HEAD %r' % (request))
  191.         request.setResponseCode(200)
  192.         request.setHeader('Content-Type', self.contentType)
  193.         request.write('')
  194.  
  195.     def requestFinished(self,result):
  196.         self.info("requestFinished %r" % result)
  197.         """ we need to find a way to destroy the pipeline here
  198.         """
  199.         #from twisted.internet import reactor
  200.         #reactor.callLater(0, self.pipeline.set_state, gst.STATE_NULL)
  201.         gobject.idle_add(self.cleanup)
  202.  
  203.     def on_message(self,bus,message):
  204.         t = message.type
  205.         print "on_message", t
  206.         if t == gst.MESSAGE_ERROR:
  207.             #err, debug = message.parse_error()
  208.             #print "Error: %s" % err, debug
  209.             self.cleanup()
  210.         elif t == gst.MESSAGE_EOS:
  211.             self.cleanup()
  212.  
  213.     def cleanup(self):
  214.         self.pipeline.set_state(gst.STATE_NULL)
  215.  
  216.  
  217. class BaseTranscoder(resource.Resource, log.Loggable):
  218.     logCategory = 'transcoder'
  219.     addSlash = True
  220.  
  221.     def __init__(self,uri,destination=None):
  222.         self.info('uri %s %r' % (uri, type(uri)))
  223.         if uri[:7] not in ['file://','http://']:
  224.             uri = 'file://' + urllib.quote(uri)   #FIXME
  225.         self.source = uri
  226.         self.destination = destination
  227.         resource.Resource.__init__(self)
  228.  
  229.     def getChild(self, name, request):
  230.         self.info('getChild %s, %s' % (name, request))
  231.         return self
  232.  
  233.     def render_GET(self,request):
  234.         self.info('render GET %r' % (request))
  235.         request.setResponseCode(200)
  236.         if hasattr(self,'contentType'):
  237.             request.setHeader('Content-Type', self.contentType)
  238.         request.write('')
  239.  
  240.         headers = request.getAllHeaders()
  241.         if('connection' in headers and
  242.            headers['connection'] == 'close'):
  243.             pass
  244.  
  245.         self.start(request)
  246.         return server.NOT_DONE_YET
  247.  
  248.     def render_HEAD(self,request):
  249.         self.info('render HEAD %r' % (request))
  250.         request.setResponseCode(200)
  251.         request.setHeader('Content-Type', self.contentType)
  252.         request.write('')
  253.  
  254.     def requestFinished(self,result):
  255.         self.info("requestFinished %r" % result)
  256.         """ we need to find a way to destroy the pipeline here
  257.         """
  258.         #from twisted.internet import reactor
  259.         #reactor.callLater(0, self.pipeline.set_state, gst.STATE_NULL)
  260.         gobject.idle_add(self.cleanup)
  261.  
  262.     def on_message(self,bus,message):
  263.         t = message.type
  264.         print "on_message", t
  265.         if t == gst.MESSAGE_ERROR:
  266.             #err, debug = message.parse_error()
  267.             #print "Error: %s" % err, debug
  268.             self.cleanup()
  269.         elif t == gst.MESSAGE_EOS:
  270.             self.cleanup()
  271.  
  272.     def cleanup(self):
  273.         self.pipeline.set_state(gst.STATE_NULL)
  274.  
  275.  
  276. class PCMTranscoder(BaseTranscoder):
  277.     contentType = 'audio/L16;rate=44100;channels=2'
  278.  
  279.     def start(self,request=None):
  280.         self.info("PCMTranscoder start %r %r" % (request,self.source))
  281.         self.pipeline = gst.parse_launch(
  282.             "%s ! decodebin ! audioconvert name=conv" % self.source)
  283.  
  284.         conv = self.pipeline.get_by_name('conv')
  285.         caps = gst.Caps("audio/x-raw-int,rate=44100,endianness=4321,channels=2,width=16,depth=16,signed=true")
  286.         filter = gst.element_factory_make("capsfilter", "filter")
  287.         filter.set_property("caps", caps)
  288.         self.pipeline.add(filter)
  289.         conv.link(filter)
  290.  
  291.         sink = DataSink(destination=self.destination,request=request)
  292.         self.pipeline.add(sink)
  293.         filter.link(sink)
  294.         self.pipeline.set_state(gst.STATE_PLAYING)
  295.  
  296.         d = request.notifyFinish()
  297.         d.addBoth(self.requestFinished)
  298.  
  299.  
  300. class WAVTranscoder(BaseTranscoder):
  301.  
  302.     contentType = 'audio/x-wav'
  303.  
  304.     def start(self,request=None):
  305.         self.info("start %r", request)
  306.         self.pipeline = gst.parse_launch(
  307.             "%s ! decodebin ! audioconvert ! wavenc name=enc" % self.source)
  308.         enc = self.pipeline.get_by_name('enc')
  309.         sink = DataSink(destination=self.destination,request=request)
  310.         self.pipeline.add(sink)
  311.         enc.link(sink)
  312.         #bus = self.pipeline.get_bus()
  313.         #bus.connect('message', self.on_message)
  314.         self.pipeline.set_state(gst.STATE_PLAYING)
  315.  
  316.         d = request.notifyFinish()
  317.         d.addBoth(self.requestFinished)
  318.  
  319.  
  320. class MP3Transcoder(BaseTranscoder):
  321.  
  322.     contentType = 'audio/mpeg'
  323.  
  324.     def start(self,request=None):
  325.         self.info("start %r", request)
  326.         self.pipeline = gst.parse_launch(
  327.             "%s ! decodebin ! audioconvert ! lame name=enc" % self.source)
  328.         enc = self.pipeline.get_by_name('enc')
  329.         sink = DataSink(destination=self.destination,request=request)
  330.         self.pipeline.add(sink)
  331.         enc.link(sink)
  332.         self.pipeline.set_state(gst.STATE_PLAYING)
  333.  
  334.         d = request.notifyFinish()
  335.         d.addBoth(self.requestFinished)
  336.  
  337.  
  338. class MP4Transcoder(BaseTranscoder):
  339.     """ Only works if H264 inside Quicktime/MP4 container is input
  340.         Source has to be a valid uri
  341.     """
  342.     contentType = 'video/mp4'
  343.  
  344.     def start(self,request=None):
  345.         self.info("start %r", request)
  346.         self.pipeline = gst.parse_launch(
  347.             "%s ! qtdemux name=d ! queue ! h264parse ! mp4mux name=mux d. ! queue ! mux." % self.source)
  348.         mux = self.pipeline.get_by_name('mux')
  349.         sink = DataSink(destination=self.destination,request=request)
  350.         self.pipeline.add(sink)
  351.         mux.link(sink)
  352.         self.pipeline.set_state(gst.STATE_PLAYING)
  353.  
  354.         d = request.notifyFinish()
  355.         d.addBoth(self.requestFinished)
  356.  
  357.  
  358. class JPEGThumbTranscoder(BaseTranscoder):
  359.     """ should create a valid thumbnail according to the DLNA spec
  360.         neither width nor height must exceed 160px
  361.     """
  362.     contentType = 'image/jpeg'
  363.  
  364.     def start(self,request=None):
  365.         self.info("start %r", request)
  366.         """ what we actually want here is a pipeline that calls
  367.             us when it knows about the size of the original image,
  368.             and allows us now to adjust the caps-filter with the
  369.             calculated values for width and height
  370.  
  371.             new_width = 160
  372.             new_height = 160
  373.             if original_width > 160:
  374.                 new_heigth = int(float(original_height) * (160.0/float(original_width)))
  375.                 if new_height > 160:
  376.                     new_width = int(float(new_width) * (160.0/float(new_height)))
  377.             elif original_height > 160:
  378.                 new_width = int(float(original_width) * (160.0/float(original_height)))
  379.         """
  380.         self.pipeline = gst.parse_launch(
  381.             "%s ! jpegdec ! videoscale ! video/x-raw-yuv,width=160,height=160 ! jpegenc name=enc" % self.source)
  382.         enc = self.pipeline.get_by_name('enc')
  383.         sink = DataSink(destination=self.destination,request=request)
  384.         self.pipeline.add(sink)
  385.         enc.link(sink)
  386.         self.pipeline.set_state(gst.STATE_PLAYING)
  387.  
  388.         d = request.notifyFinish()
  389.         d.addBoth(self.requestFinished)
  390.